由于还不支持numpy,目前notebook中的 "Part B: Encrypted Aggregation" 无法正常使用。

如需查看状态,请访问 https://github.com/OpenMined/PySyft/issues/2771.

只要B部分恢复正常,这段话就会被移除。

Part 10: 安全聚合的联邦学习

在最后几节中,我们通过构建几个简单的程序来学习加密计算。 在本节中,我们将返回到第4部分的联合学习演示,我们在那里有一个“受信任的聚合器”,负责平均多个工作机的模型更新。

现在,我们将使用新的工具进行加密计算,以删除此受信任的聚合器,因为它不理想,因为它假定我们可以找到足够值得信任的人来访问此敏感信息。这并非总是如此。

因此,在这个笔记本中,我们将展示如何使用SMPC来执行安全聚合,这样我们就不需要“受信任的聚合器”。

作者:

中文版译者:

1: 普通的联邦学习

首先,这是一些代码,用于在Boston Housing Dataset上执行经典的联邦学习。这部分代码分为几个部分。

配置


In [ ]:
import pickle

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

class Parser:
    """Parameters for training"""
    def __init__(self):
        self.epochs = 10
        self.lr = 0.001
        self.test_batch_size = 8
        self.batch_size = 8
        self.log_interval = 10
        self.seed = 1
    
args = Parser()

torch.manual_seed(args.seed)
kwargs = {}

加载数据集


In [ ]:
with open('../data/BostonHousing/boston_housing.pickle','rb') as f:
    ((X, y), (X_test, y_test)) = pickle.load(f)

X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
# preprocessing
mean = X.mean(0, keepdim=True)
dev = X.std(0, keepdim=True)
mean[:, 3] = 0. # the feature at column 3 is binary,
dev[:, 3] = 1.  # so we don't standardize it
X = (X - mean) / dev
X_test = (X_test - mean) / dev
train = TensorDataset(X, y)
test = TensorDataset(X_test, y_test)
train_loader = DataLoader(train, batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test, batch_size=args.test_batch_size, shuffle=True, **kwargs)

神经网络结构


In [ ]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(13, 32)
        self.fc2 = nn.Linear(32, 24)
        self.fc3 = nn.Linear(24, 1)

    def forward(self, x):
        x = x.view(-1, 13)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = Net()
optimizer = optim.SGD(model.parameters(), lr=args.lr)

挂钩PyTorch


In [ ]:
import syft as sy

hook = sy.TorchHook(torch)
bob = sy.VirtualWorker(hook, id="bob")
alice = sy.VirtualWorker(hook, id="alice")
james = sy.VirtualWorker(hook, id="james")

compute_nodes = [bob, alice]

将数据发送给工作机 通常工作机已经拥有了数据,这只是出于演示目的,我们选择手动发送


In [ ]:
train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    train_distributed_dataset.append((data, target))

训练函数


In [ ]:
def train(epoch):
    model.train()
    for batch_idx, (data,target) in enumerate(train_distributed_dataset):
        worker = data.location
        model.send(worker)

        optimizer.zero_grad()
        # update the model
        pred = model(data)
        loss = F.mse_loss(pred.view(-1), target)
        loss.backward()
        optimizer.step()
        model.get()
            
        if batch_idx % args.log_interval == 0:
            loss = loss.get()
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * data.shape[0], len(train_loader),
                       100. * batch_idx / len(train_loader), loss.item()))

测试函数


In [ ]:
def test():
    model.eval()
    test_loss = 0
    for data, target in test_loader:
        output = model(data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        
    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))

训练模型


In [ ]:
import time

In [ ]:
t = time.time()

for epoch in range(1, args.epochs + 1):
    train(epoch)

    
total_time = time.time() - t
print('Total', round(total_time, 2), 's')

计算性能


In [ ]:
test()

2: 添加加密聚合

现在,我们将略微修改此示例,以使用加密来聚合梯度。 不同的主要部分实际上是train()函数中的1或2行代码,我们将指出。目前,让我们重新处理数据并初始化bob和alice的模型。


In [ ]:
remote_dataset = (list(),list())

train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    remote_dataset[batch_idx % len(compute_nodes)].append((data, target))

def update(data, target, model, optimizer):
    model.send(data.location)
    optimizer.zero_grad()
    pred = model(data)
    loss = F.mse_loss(pred.view(-1), target)
    loss.backward()
    optimizer.step()
    return model

bobs_model = Net()
alices_model = Net()

bobs_optimizer = optim.SGD(bobs_model.parameters(), lr=args.lr)
alices_optimizer = optim.SGD(alices_model.parameters(), lr=args.lr)

models = [bobs_model, alices_model]
params = [list(bobs_model.parameters()), list(alices_model.parameters())]
optimizers = [bobs_optimizer, alices_optimizer]

建立我们的训练逻辑

唯一的真正的差异是此训练方法的内部。让我们逐步讲解

A: 训练:


In [ ]:
# 选择训练哪个batch
data_index = 0
# 更新远程模型
# 我们可以在进行此操作之前对其进行多次迭代,但是在这里每个工人只进行一次迭代
for remote_index in range(len(compute_nodes)):
    data, target = remote_dataset[remote_index][data_index]
    models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])

B: 加密聚合


In [ ]:
# 创建一个列表,我们将在其中存储我们的加密模型平均值
new_params = list()

In [ ]:
# 遍历每个参数
for param_i in range(len(params[0])):

    # 对每个工作机
    spdz_params = list()
    for remote_index in range(len(compute_nodes)):
        
        # 从每个工作机中选择相同的参数并复制
        copy_of_parameter = params[remote_index][param_i].copy()
        
        
        # 由于SMPC只能使用整数(不能使用浮点数)
        # 因此我们需要使用Integers存储十进制信息。
        # 换句话说,我们需要使用“固定精度”编码。
        fixed_precision_param = copy_of_parameter.fix_precision()
        
        # 现在我们在远程计算机上对其进行加密。
        # 注意,fixed_precision_param“已经”是一个指针。
        # 因此,当我们调用share时,它实际上是对指向的数据进行加密。
        # 而它会返回一个指向MPC秘密共享对象的指针,也就是我们需要的共享分片。
        
        encrypted_param = fixed_precision_param.share(bob, alice, crypto_provider=james)
        
        # 现在我们获取指向MPC共享值的指针
        param = encrypted_param.get()
        
        # 保存参数,以便我们可以使用工作机的相同参数取平均值
        spdz_params.append(param)

    # 来自多个工作人员的平均参数,将它们提取到本地计算机上
    # 以固定精度解密和解码,再返回一个浮点数
    new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
    
    # 保存新的平均参数
    new_params.append(new_param)

C: 清理


In [ ]:
with torch.no_grad():
    for model in params:
        for param in model:
            param *= 0

    for model in models:
        model.get()

    for remote_index in range(len(compute_nodes)):
        for param_index in range(len(params[remote_index])):
            params[remote_index][param_index].set_(new_params[param_index])

把它们放在一起!!

现在我们知道了每个步骤,我们可以将所有步骤放到一个训练循环中!


In [ ]:
def train(epoch):
    for data_index in range(len(remote_dataset[0])-1):
        # update remote models
        for remote_index in range(len(compute_nodes)):
            data, target = remote_dataset[remote_index][data_index]
            models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])

        # encrypted aggregation
        new_params = list()
        for param_i in range(len(params[0])):
            spdz_params = list()
            for remote_index in range(len(compute_nodes)):
                spdz_params.append(params[remote_index][param_i].copy().fix_precision().share(bob, alice, crypto_provider=james).get())

            new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
            new_params.append(new_param)

        # cleanup
        with torch.no_grad():
            for model in params:
                for param in model:
                    param *= 0

            for model in models:
                model.get()

            for remote_index in range(len(compute_nodes)):
                for param_index in range(len(params[remote_index])):
                    params[remote_index][param_index].set_(new_params[param_index])

In [ ]:
def test():
    models[0].eval()
    test_loss = 0
    for data, target in test_loader:
        output = models[0](data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        
    test_loss /= len(test_loader.dataset)
    print('Test set: Average loss: {:.4f}\n'.format(test_loss))

In [ ]:
t = time.time()

for epoch in range(args.epochs):
    print(f"Epoch {epoch + 1}")
    train(epoch)
    test()

    
total_time = time.time() - t
print('Total', round(total_time, 2), 's')

恭喜!!! 是时候加入社区了!

祝贺您完成本笔记本教程! 如果您喜欢此方法,并希望加入保护隐私、去中心化AI和AI供应链(数据)所有权的运动,则可以通过以下方式做到这一点!

给 PySyft 加星

帮助我们的社区的最简单方法是仅通过给GitHub存储库加注星标! 这有助于提高人们对我们正在构建的出色工具的认识。

加入我们的 Slack!

保持最新进展的最佳方法是加入我们的社区! 您可以通过填写以下表格来做到这一点http://slack.openmined.org

加入代码项目!

对我们的社区做出贡献的最好方法是成为代码贡献者! 您随时可以转到PySyft GitHub的Issue页面并过滤“projects”。这将向您显示所有概述,选择您可以加入的项目!如果您不想加入项目,但是想做一些编码,则还可以通过搜索标记为“good first issue”的GitHub问题来寻找更多的“一次性”微型项目。

捐赠

如果您没有时间为我们的代码库做贡献,但仍想提供支持,那么您也可以成为Open Collective的支持者。所有捐款都将用于我们的网络托管和其他社区支出,例如黑客马拉松和聚会!

OpenMined's Open Collective Page


In [ ]: